-
Notifications
You must be signed in to change notification settings - Fork 198
Smart conn #637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Smart conn #637
Conversation
Conflicts: store
Conflicts: store
Conflicts: store
sumwale
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See some comments. About point #3 have a doubt as to how that will work properly if we are not ensuring that commit happens after all tasks are done. Perhaps a separate message at the end or some accounting on executor side keeping track of partitions scheduled on that executor so that it will commit only if all the partitions are done.
| SparkShellRDDHelper.snapshotTxId.set(txid) | ||
| val getSnapshotTXId = conn.prepareCall(s"call sys.GET_SNAPSHOT_TXID (?)") | ||
| getSnapshotTXId.registerOutParameter(1, java.sql.Types.VARCHAR) | ||
| getSnapshotTXId.execute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use empty parens in actions as per scala convention (and none in getters)
| val txid: String = getSnapshotTXId.getString(1) | ||
| getSnapshotTXId.close() | ||
| SparkShellRDDHelper.snapshotTxId.set(txid) | ||
| logDebug(s"The snapshot tx id is ${txid} and tablename is ${tableName}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation change looks incorrect and older one with 2 spaces was proper
| ps.close() | ||
| SparkShellRDDHelper.snapshotTxId.set(null) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be done after all tasks are done? Won't a commit in the middle of another task execution cause trouble?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes in general. But for read operations if the txState is already being used in an iterator , even if the state is closed, we compare with the snapshot stored.
For write operations this would cause issue if tx are committed by a task prematurely, however currently I haven't found a scenario for that in this task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@suranjan
Even then a read in a collocated join may not even be scheduled (or scheduled by Spark but not by OS) while another task commits. In that case the two will use different snapshots? I don't see what kind of semantics this change is trying to provide.
|
- What are the semantics of concurrent tasks/threads participating in the
same txn? Couldn't this result in possible loss of data integrity? Unless,
the entire state across all tasks is accounted for as a unit and a single
thread executes the commit protocol.
- Are there implications when spark is re-executing tasks? i.e. have we
accounted for all cases when tasks could get re-executed by Spark.
…-----
Jags
SnappyData blog <http://www.snappydata.io/blog>
Download binary, source <https://github.com/SnappyDataInc/snappydata>
On Sat, Jun 3, 2017 at 4:03 AM, Sumedh Wale ***@***.***> wrote:
***@***.**** commented on this pull request.
See some comments. About point #3
<#3> have a doubt as to
how that will work properly if we are not ensuring that commit happens
after all tasks are done. Perhaps a separate message at the end or some
accounting on executor side keeping track of partitions scheduled on that
executor so that it will commit only if all the partitions are done.
------------------------------
In core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/
JDBCSourceAsColumnarStore.scala
<#637 (comment)>
:
> val rs = stmt.executeQuery()
// get the txid which was used to take the snapshot.
if (!_commitTx) {
- val getSnapshotTXId = conn.prepareCall(s"call sys.GET_SNAPSHOT_TXID (?)")
- getSnapshotTXId.registerOutParameter(1, java.sql.Types.VARCHAR)
- getSnapshotTXId.execute
- val txid: String = getSnapshotTXId.getString(1)
- getSnapshotTXId.close()
- SparkShellRDDHelper.snapshotTxId.set(txid)
+ val getSnapshotTXId = conn.prepareCall(s"call sys.GET_SNAPSHOT_TXID (?)")
+ getSnapshotTXId.registerOutParameter(1, java.sql.Types.VARCHAR)
+ getSnapshotTXId.execute
use empty parens in actions as per scala convention (and none in getters)
------------------------------
In core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/
JDBCSourceAsColumnarStore.scala
<#637 (comment)>
:
> val rs = stmt.executeQuery()
// get the txid which was used to take the snapshot.
if (!_commitTx) {
- val getSnapshotTXId = conn.prepareCall(s"call sys.GET_SNAPSHOT_TXID (?)")
- getSnapshotTXId.registerOutParameter(1, java.sql.Types.VARCHAR)
- getSnapshotTXId.execute
- val txid: String = getSnapshotTXId.getString(1)
- getSnapshotTXId.close()
- SparkShellRDDHelper.snapshotTxId.set(txid)
+ val getSnapshotTXId = conn.prepareCall(s"call sys.GET_SNAPSHOT_TXID (?)")
+ getSnapshotTXId.registerOutParameter(1, java.sql.Types.VARCHAR)
+ getSnapshotTXId.execute
+ val txid: String = getSnapshotTXId.getString(1)
+ getSnapshotTXId.close()
+ SparkShellRDDHelper.snapshotTxId.set(txid)
+ logDebug(s"The snapshot tx id is ${txid} and tablename is ${tableName}")
indentation change looks incorrect and older one with 2 spaces was proper
------------------------------
In core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/
JDBCSourceAsColumnarStore.scala
<#637 (comment)>
:
> @@ -595,6 +596,24 @@ class SmartConnectorRowRDD(_session: SnappySession,
pushProjections = true, useResultSet = true, _connProperties,
_filters, _partEval, _commitTx) {
+
+ override def commitTxBeforeTaskCompletion(conn: Option[Connection], context: TaskContext) = {
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => {
+ val txId = SparkShellRDDHelper.snapshotTxId.get
+ logDebug(s"The txid going to be committed is $txId " + tableName)
+ if ((txId ne null) && !txId.equals("null")
+ /* && !(tx.asInstanceOf[TXStateProxy]).isClosed() */ ) {
+ val ps = conn.get.prepareStatement(s"call sys.COMMIT_SNAPSHOT_TXID(?)")
+ ps.setString(1, txId)
+ ps.executeUpdate()
+ logDebug(s"The txid being committed is $txId")
+ ps.close()
+ SparkShellRDDHelper.snapshotTxId.set(null)
+ }
+ }
Shouldn't this be done *after* all tasks are done? Won't a commit in the
middle of another task execution cause trouble?
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#637 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AB2KBqXi_a0OE2F1t0abIHstJu4qDZnpks5sAT18gaJpZM4Nu94y>
.
|
|
Currently, the snapshot is not across task. Each task takes its own
snapshot, so snapshot isolation is at partition level. Once we have
lead/driver initiated transaction with global snapshot implementation, we
can provide snapshot across tasks.
Because we take snapshot when a task starts, if a task is re-executed then
snapshot will also be new.
On Sat, Jun 3, 2017 at 8:05 PM, Jags Ramnarayan <[email protected]>
wrote:
… - What are the semantics of concurrent tasks/threads participating in the
same txn? Couldn't this result in possible loss of data integrity? Unless,
the entire state across all tasks is accounted for as a unit and a single
thread executes the commit protocol.
- Are there implications when spark is re-executing tasks? i.e. have we
accounted for all cases when tasks could get re-executed by Spark.
-----
Jags
SnappyData blog <http://www.snappydata.io/blog>
Download binary, source <https://github.com/SnappyDataInc/snappydata>
On Sat, Jun 3, 2017 at 4:03 AM, Sumedh Wale ***@***.***>
wrote:
> ***@***.**** commented on this pull request.
>
> See some comments. About point #3
> <#3> have a doubt as to
> how that will work properly if we are not ensuring that commit happens
> after all tasks are done. Perhaps a separate message at the end or some
> accounting on executor side keeping track of partitions scheduled on that
> executor so that it will commit only if all the partitions are done.
> ------------------------------
>
> In core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/
> JDBCSourceAsColumnarStore.scala
> <https://github.com/SnappyDataInc/snappydata/pull/
637#discussion_r119984539>
> :
>
> > val rs = stmt.executeQuery()
>
> // get the txid which was used to take the snapshot.
> if (!_commitTx) {
> - val getSnapshotTXId = conn.prepareCall(s"call sys.GET_SNAPSHOT_TXID
(?)")
> - getSnapshotTXId.registerOutParameter(1, java.sql.Types.VARCHAR)
> - getSnapshotTXId.execute
> - val txid: String = getSnapshotTXId.getString(1)
> - getSnapshotTXId.close()
> - SparkShellRDDHelper.snapshotTxId.set(txid)
> + val getSnapshotTXId = conn.prepareCall(s"call sys.GET_SNAPSHOT_TXID
(?)")
> + getSnapshotTXId.registerOutParameter(1, java.sql.Types.VARCHAR)
> + getSnapshotTXId.execute
>
> use empty parens in actions as per scala convention (and none in getters)
> ------------------------------
>
> In core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/
> JDBCSourceAsColumnarStore.scala
> <https://github.com/SnappyDataInc/snappydata/pull/
637#discussion_r119984554>
> :
>
> > val rs = stmt.executeQuery()
>
> // get the txid which was used to take the snapshot.
> if (!_commitTx) {
> - val getSnapshotTXId = conn.prepareCall(s"call sys.GET_SNAPSHOT_TXID
(?)")
> - getSnapshotTXId.registerOutParameter(1, java.sql.Types.VARCHAR)
> - getSnapshotTXId.execute
> - val txid: String = getSnapshotTXId.getString(1)
> - getSnapshotTXId.close()
> - SparkShellRDDHelper.snapshotTxId.set(txid)
> + val getSnapshotTXId = conn.prepareCall(s"call sys.GET_SNAPSHOT_TXID
(?)")
> + getSnapshotTXId.registerOutParameter(1, java.sql.Types.VARCHAR)
> + getSnapshotTXId.execute
> + val txid: String = getSnapshotTXId.getString(1)
> + getSnapshotTXId.close()
> + SparkShellRDDHelper.snapshotTxId.set(txid)
> + logDebug(s"The snapshot tx id is ${txid} and tablename is
${tableName}")
>
> indentation change looks incorrect and older one with 2 spaces was proper
> ------------------------------
>
> In core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/
> JDBCSourceAsColumnarStore.scala
> <https://github.com/SnappyDataInc/snappydata/pull/
637#discussion_r119984578>
> :
>
> > @@ -595,6 +596,24 @@ class SmartConnectorRowRDD(_session:
SnappySession,
> pushProjections = true, useResultSet = true, _connProperties,
> _filters, _partEval, _commitTx) {
>
> +
> + override def commitTxBeforeTaskCompletion(conn: Option[Connection],
context: TaskContext) = {
> + Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => {
> + val txId = SparkShellRDDHelper.snapshotTxId.get
> + logDebug(s"The txid going to be committed is $txId " + tableName)
> + if ((txId ne null) && !txId.equals("null")
> + /* && !(tx.asInstanceOf[TXStateProxy]).isClosed() */ ) {
> + val ps = conn.get.prepareStatement(s"call
sys.COMMIT_SNAPSHOT_TXID(?)")
> + ps.setString(1, txId)
> + ps.executeUpdate()
> + logDebug(s"The txid being committed is $txId")
> + ps.close()
> + SparkShellRDDHelper.snapshotTxId.set(null)
> + }
> + }
>
> Shouldn't this be done *after* all tasks are done? Won't a commit in the
> middle of another task execution cause trouble?
>
> —
> You are receiving this because you are subscribed to this thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/SnappyDataInc/snappydata/pull/
637#pullrequestreview-41918181>,
> or mute the thread
> <https://github.com/notifications/unsubscribe-auth/AB2KBqXi_
a0OE2F1t0abIHstJu4qDZnpks5sAT18gaJpZM4Nu94y>
> .
>
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#637 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AALmGcgm-M-kn-iezZvFYumQ7ouOD0TPks5sAW9KgaJpZM4Nu94y>
.
--
Regards,
Suranjan Kumar
SnappyData (http://www.snappydata.io)
|
Conflicts: store
|
@suranjan If we are providing paritition-level isolation semantics only as you mention, then what is this PR trying to do using TX across tasks (which will be one per partition)? What is this trying to solve? This will result in inconsistent semantics. |
|
If the multiple tasks are being executed by the same thread, it will use
the same snapshot even if the first task has completed. for e.g (row buffer
scan + column Table scan), similarly for colocated join.
May be my understaning of colocated join is wrong, but in the log I saw
both tables being iterated simultaneously in the same thread on spark side.
For, this case, in embedded mode we do use same snapshot. For smart
connector mode, different snapshots were being used due to different
connection object and one was being overwritten by other in the threadlocal
variable on spark side.
…On Sun, Jun 4, 2017 at 12:17 AM, Sumedh Wale ***@***.***> wrote:
@suranjan <https://github.com/suranjan> If we are providing
paritition-level isolation semantics only as you mention, then what is this
PR trying to do using TX across tasks (which will be one per partition)?
What is this trying to solve? This will result in inconsistent semantics.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#637 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AALmGYIVA5MDfmP4dZQf5qZhNvxwHthRks5sAao-gaJpZM4Nu94y>
.
--
Regards,
Suranjan Kumar
SnappyData (http://www.snappydata.io)
|
|
Row buffer scan + column buffer scan of a single bucket is a single task. Likewise collocated join of two buckets is a single task. A single thread executing multiple tasks in same job should take separate snapshots for reads to keep clean semantics. Even for writes, row buffer writes and column buffer writes are all same task though different code segments. Anyway since this is already merged lets discuss what to do going forward later (unless this change causes trouble elsewhere). |
|
Ok.
The pull request covers both
1. Colocate join of two buckets (in case of smart connector)
2. Single thread executing multiple tasks in same job.
Without this merge, 1st was broken as multiple scan in smart connector mode
was using multiple snapshot.
…On Sun, Jun 4, 2017 at 12:46 AM, Sumedh Wale ***@***.***> wrote:
Row buffer scan + column buffer scan of a single bucket is a single task.
Likewise collocated join of two buckets is a single task. A single thread
executing multiple tasks in same job should take separate snapshots for
reads to keep clean semantics. Even for writes, row buffer writes and
column buffer writes are all same task though different code segments.
Anyway since this is already merged lets discuss what to do going forward
later (unless this change causes trouble elsewhere).
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#637 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AALmGdn0JRAi60vviIgHSMI9t9xvvzE5ks5sAbDygaJpZM4Nu94y>
.
--
Regards,
Suranjan Kumar
SnappyData (http://www.snappydata.io)
|
Changes proposed in this pull request
Patch testing
precheckin
ReleaseNotes.txt changes
(Does this change require an entry in ReleaseNotes.txt? If yes, has it been added to it?)
Other PRs
(Does this change require changes in other projects- store, spark, spark-jobserver, aqp? Add the links of PR of the other subprojects that are related to this change)